Skip to content

Conversation

@tzolov
Copy link
Contributor

@tzolov tzolov commented Jun 24, 2025

This change improves the transport layer with reactive patterns and adds support for the latest MCP specification while maintaining backward compatibility with existing SSE transport.

  • Add HttpClientStreamableHttpTransport implementing 2025-03-26 MCP Streamable HTTP spec
  • Add ResponseSubscribers utility for handling SSE and JSON HTTP responses
  • Refactor HttpClientSseClientTransport to use reactive streams instead of CompletableFuture
    • Replace FlowSseClient with direct reactive stream handling
    • Use Disposable-based connection management instead of CountDownLatch
    • Replace message endpoint discovery with Sinks.One approach
  • Add resiliency tests using Toxiproxy for network failure scenarios
  • Minor type safety improvements in StdioClientTransport and DefaultMcpTransportStream

tzolov added 5 commits June 24, 2025 13:03
…to reactive streams

This change imporves the transport layer with reactive patterns and adds support for
the latest MCP specification while maintaining backward compatibility with existing SSE transport.

- Add HttpClientStreamableHttpTransport implementing 2025-03-26 MCP Streamable HTTP spec
- Add ResponseSubscribers utility for handling SSE and JSON HTTP responses
- Refactor HttpClientSseClientTransport to use reactive streams instead of CompletableFuture
  - Replace FlowSseClient with direct reactive stream handling
  - Use Disposable-based connection management instead of CountDownLatch
  - Replace message endpoint discovery with Sinks.One approach
- Add resiliency tests using Toxiproxy for network failure scenarios
- Minor type safety improvements in StdioClientTransport and DefaultMcpTransportStream

Signed-off-by: Christian Tzolov <[email protected]>
Signed-off-by: Christian Tzolov <[email protected]>
Signed-off-by: Christian Tzolov <[email protected]>
Signed-off-by: Christian Tzolov <[email protected]>
- Add proper exception handling with CompletableFuture.exceptionallyCompose for async HTTP operations
- Add test for specific exception type handling in resiliency tests

This change makes the HTTP client transports more robust by ensuring exceptions
are properly propagated.

Signed-off-by: Christian Tzolov <[email protected]>
Signed-off-by: Christian Tzolov <[email protected]>
@tzolov tzolov marked this pull request as ready for review June 26, 2025 10:16
tzolov added 2 commits June 26, 2025 13:59
- Replace exceptionallyCompose with whenComplete for better async error handling
- Make ResponseSubscribers class package-private

Signed-off-by: Christian Tzolov <[email protected]>
@tzolov tzolov changed the title feat: implement Streamable HTTP transport and refactor SSE transport to reactive streams feat: implement HttpClient Streamable HTTP transport Jul 1, 2025
@stantonk
Copy link
Contributor

stantonk commented Jul 1, 2025

@tzolov is there a companion PR that handles Streamable HTTP on the server side?

@stantonk
Copy link
Contributor

stantonk commented Jul 1, 2025

@tzolov oh wait, this may be it? #290

Signed-off-by: Dariusz Jędrzejczyk <[email protected]>
@tzolov tzolov added this to the 0.11.0 milestone Jul 3, 2025
Copy link
Member

@chemicL chemicL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some comments from when I started reviewing, so posting them here. Most of the concerns I did address by pushing two commits. Perhaps we can merge this and follow up with what's remaining as lower priority.

Comment on lines +49 to +95
static ToxiproxyContainer toxiproxy = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0").withNetwork(network)
.withExposedPorts(8474, 3000);

static Proxy proxy;

static {
container.start();

toxiproxy.start();

final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort());
try {
proxy = toxiproxyClient.createProxy("everything-server", "0.0.0.0:3000", "everything-server:3001");
}
catch (IOException e) {
throw new RuntimeException("Can't create proxy!", e);
}

final String ipAddressViaToxiproxy = toxiproxy.getHost();
final int portViaToxiproxy = toxiproxy.getMappedPort(3000);

host = "http://" + ipAddressViaToxiproxy + ":" + portViaToxiproxy;
}

static void disconnect() {
long start = System.nanoTime();
try {
proxy.toxics().resetPeer("RESET_DOWNSTREAM", ToxicDirection.DOWNSTREAM, 0);
proxy.toxics().resetPeer("RESET_UPSTREAM", ToxicDirection.UPSTREAM, 0);
logger.info("Disconnect took {} ms", Duration.ofNanos(System.nanoTime() - start).toMillis());
}
catch (IOException e) {
throw new RuntimeException("Failed to disconnect", e);
}
}

static void reconnect() {
long start = System.nanoTime();
try {
proxy.toxics().get("RESET_UPSTREAM").remove();
proxy.toxics().get("RESET_DOWNSTREAM").remove();
logger.info("Reconnect took {} ms", Duration.ofNanos(System.nanoTime() - start).toMillis());
}
catch (IOException e) {
throw new RuntimeException("Failed to reconnect", e);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is copied from the resiliency tests, can we unify it in some shared test util?

disconnect();

// Veryfiy that the exception type is IOException and not TimeoutException
StepVerifier.create(mcpAsyncClient.ping()).expectError(IOException.class).verify();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


disconnect();

StepVerifier.create(mcpAsyncClient.ping()).expectError(CompletionException.class).verify();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we look at the cause to find an IOException maybe to make sure it's not a CompletableFuture's timeout?

* tools, resources, prompts, etc.
*
* @author Dariusz Jędrzejczyk
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add a note that it's a copy from mcp-test module as we do in other copied test classes to make sure we keep them in sync.


}

public static class BodylessResponseLineSubscriber extends BaseSubscriber<String> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static class BodylessResponseLineSubscriber extends BaseSubscriber<String> {
public static class BodilessResponseLineSubscriber extends BaseSubscriber<String> {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* Represents a Server-Sent Event with its standard fields.
*
* @param id the event ID, may be {@code null}
* @param event the event type, may be {@code null} (defaults to "message")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The record does not impose any default.

public static record SseEvent(String id, String event, String data) {
}

public record ResponseEvent(ResponseInfo responseInfo, SseEvent sseEvent, JSONRPCMessage jsonRpcMessage) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems responseInfo should not be null, we should add a constructor with a check.

Comment on lines +159 to +174
var matcher = EVENT_DATA_PATTERN.matcher(line);
if (matcher.find()) {
this.eventBuilder.append(matcher.group(1).trim()).append("\n");
}
}
else if (line.startsWith("id:")) {
var matcher = EVENT_ID_PATTERN.matcher(line);
if (matcher.find()) {
this.currentEventId.set(matcher.group(1).trim());
}
}
else if (line.startsWith("event:")) {
var matcher = EVENT_TYPE_PATTERN.matcher(line);
if (matcher.find()) {
this.currentEventType.set(matcher.group(1).trim());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, matchers are quite slow. Perhaps we can replace the data/id/event with line.substring(5).trim() where 5 is the length of "data:"?

Comment on lines 118 to 121
/**
* Initializes the subscription and sets up disposal callback.
* @param subscription the {@link Subscription} to the upstream line source
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need these javadocs on protected methods.

Comment on lines 139 to 144
/**
* Processes each line from the SSE stream according to the SSE protocol. Empty
* lines trigger event emission, other lines are parsed for data, id, or event
* type.
* @param line the line to process from the SSE stream
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This javadoc should only be needed at the top of the class.

chemicL and others added 3 commits July 3, 2025 20:18
Signed-off-by: Dariusz Jędrzejczyk <[email protected]>
Signed-off-by: Christian Tzolov <[email protected]>
logger.debug("SSE connection established successfully");
}
})).onErrorComplete().subscribe();
})).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not necessary, since it's swallowed in the next operator. And that's ok since we emit the throwable to the sink. We can clean this up afterwards.

tzolov added a commit that referenced this pull request Jul 3, 2025
…r handling (#337)

- Add HttpClientStreamableHttpTransport implementing the 2025-03-26 MCP specification
- Add ResponseSubscribers utility for handling different HTTP response body types
- Refactor HttpClientSseClientTransport to use reactive streams instead of FlowSseClient
  - Remove FlowSseClient in favor of reactive stream patterns
  - Use Disposable-based connection management instead of CountDownLatch
  - Replace message endpoint discovery with Sinks.One approach
- Improve error handling by replacing doOnError/onErrorResume with onErrorComplete
- Add test coverage for new Streamable HTTP transport
- Add resiliency tests for connection handling and session invalidation

BREAKING CHANGE: FlowSseClient has been removed in favor of reactive streams

Signed-off-by: Christian Tzolov <[email protected]>

Co-authored-by: Dariusz Jędrzejczyk <[email protected]>
@tzolov
Copy link
Contributor Author

tzolov commented Jul 3, 2025

Rebased, squashed and merged at c711f83

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants